Data Refrsh

Load libraries

library(modeltime)
library(dplyr)
library(EIAapi)
library(jsonlite)
library(gt)
library(plotly)
library(lubridate)
source("../pipeline/eia_data.R")
source("../pipeline/backtesting.R")

API Settings:

meta_json <- read_json(path = "../settings/settings.json")
s <- meta_json$series
series <- lapply(1:length(s), function(i) {
    return(data.frame(
        parent_id = s[[i]]$parent_id,
        parent_name = s[[i]]$parent_name,
        subba_id = s[[i]]$subba_id,
        subba_name = s[[i]]$subba_name
    ))
}) |>
    bind_rows()


facets_template <- list(
    parent = NULL,
    subba = NULL
)

offset <- 2250

eia_api_key <- Sys.getenv("EIA_API_KEY")

api_path <- meta_json$api_path
meta_path <- meta_json$meta_path
data_path <- meta_json$data_path
forecast_path <- meta_json$forecast_path
forecast_log_path <- meta_json$forecast_log_path
calibrated_models_path <- meta_json$calibrated_models_path

h <- meta_json$backtesting$h
lags <- meta_json$backtesting$features$lags |> unlist()
train_length <- meta_json$train_length
meta_obj <- get_metadata(api_key = eia_api_key, api_path = api_path, meta_path = meta_path, series = series)

gt(meta_obj$request_meta)
parent subba end_act request_start end updates_available
CISO PGAE 2024-07-08 07:00:00 2024-07-08 08:00:00 2024-07-08 07:00:00 FALSE
CISO SCE 2024-07-08 07:00:00 2024-07-08 08:00:00 2024-07-08 07:00:00 FALSE
CISO SDGE 2024-07-08 07:00:00 2024-07-08 08:00:00 2024-07-08 07:00:00 FALSE
CISO VEA 2024-07-08 07:00:00 2024-07-08 08:00:00 2024-07-08 07:00:00 FALSE
m <- meta_obj$request_meta
index <- meta_obj$last_index + 1

data <- NULL
meta_new <- NULL

for (i in 1:nrow(m)) {
    facets <- facets_template
    facets$parent <- m$parent[i]
    facets$subba <- m$subba[i]
    start <- m$request_start[i]
    end <- m$end[i]
    print(paste(facets$parent, facets$subba, sep = " - "))

    if (m$updates_available[i]) {
        temp <- eia_backfill(
            start = start - lubridate::hours(24),
            end = end + lubridate::hours(24),
            offset = offset,
            api_key = eia_api_key,
            api_path = paste(api_path, "data", sep = ""),
            facets = facets
        ) |> dplyr::filter(time >= start & time <= end)

        index <- seq.POSIXt(from = start, to = end, by = "hour")
        ts_obj <- data.frame(period = index) |>
            left_join(temp, by = c("period" = "time"))
    } else {
        ts_obj <- NULL
        print("No new data is available")
    }

    meta_temp <- create_metadata(data = ts_obj, start = start, end = end, type = "refresh")

    if (is.null(ts_obj)) {
        meta_temp$parent <- m$parent[i]
        meta_temp$subba <- m$subba[i]
    }

    if (meta_temp$success) {
        print("Append the new data")
        d <- append_data(data_path = data_path, new_data = ts_obj, save = TRUE)
        meta_temp$update <- TRUE
    } else {
        meta_temp$update <- FALSE
        meta_temp$comments <- paste(meta_temp$comments, "The data refresh failed, please check the log; ", sep = "")
    }
    meta_temp$index <- NA
    meta_df <- as.data.frame(meta_temp)
    if (!is.null(ts_obj)) {
        data <- bind_rows(data, ts_obj)
    }
    meta_new <- bind_rows(meta_new, meta_df)
}
[1] "CISO - PGAE"
[1] "No new data is available"
[1] "CISO - SCE"
[1] "No new data is available"
[1] "CISO - SDGE"
[1] "No new data is available"
[1] "CISO - VEA"
[1] "No new data is available"
gt(meta_new)
index parent subba time start end start_act end_act start_match end_match n_obs na type update success comments
NA CISO PGAE 2024-07-09 03:32:46.54045 2024-07-08 08:00:00 2024-07-08 07:00:00 NA NA NA NA NA NA refresh FALSE FALSE No new data is available; The data refresh failed, please check the log;
NA CISO SCE 2024-07-09 03:32:46.541495 2024-07-08 08:00:00 2024-07-08 07:00:00 NA NA NA NA NA NA refresh FALSE FALSE No new data is available; The data refresh failed, please check the log;
NA CISO SDGE 2024-07-09 03:32:46.542286 2024-07-08 08:00:00 2024-07-08 07:00:00 NA NA NA NA NA NA refresh FALSE FALSE No new data is available; The data refresh failed, please check the log;
NA CISO VEA 2024-07-09 03:32:46.542929 2024-07-08 08:00:00 2024-07-08 07:00:00 NA NA NA NA NA NA refresh FALSE FALSE No new data is available; The data refresh failed, please check the log;
meta_updated <- append_metadata(meta_path = meta_path, new_meta = meta_new, save = TRUE, init = FALSE)
[1] "Saving the metadata file"

Plot the Series

We will use Plotly to visualize the series:

if (!is.null(data)) {
    d <- data |> arrange(subba, period)

    p <- plot_ly(d, x = ~period, y = ~value, color = ~subba, type = "scatter", mode = "lines")

    p
} else {
    print("No new data is available")
}
[1] "No new data is available"
data <- readr::read_csv(file = data_path, col_types = readr::cols(
    period = readr::col_datetime(format = ""),
    subba = readr::col_character(),
    subba_name = readr::col_character(),
    parent = readr::col_character(),
    parent_name = readr::col_character(),
    value = readr::col_double(),
    value_units = readr::col_character()
)) |>
    add_trend(index = "period") |>
    add_seasonal(index = "period")


p <- plot_ly(data, x = ~period, y = ~value, color = ~subba, type = "scatter", mode = "lines")

p

Refresh the forecast

refresh_forecast <- function(
    input,
    forecast_log_path,
    forecast_path,
    calibrated_models_path,
    h,
    index,
    var,
    train_length = 24 * 31 * 25,
    lags,
    init = FALSE,
    save = FALSE) {
    fc <- NULL
    input <- input |>
        dplyr::select(subba, !!rlang::sym(index), y = !!rlang::sym(var)) |>
        add_trend(index = index) |>
        add_seasonal(index = index)

    input_last_point <- input |>
        dplyr::group_by(subba) |>
        dplyr::filter(!!rlang::sym(index) == max(!!rlang::sym(index))) |>
        dplyr::ungroup() |>
        dplyr::select(subba, last_time = !!rlang::sym(index))

    log <- load_forecast_log(forecast_log_path = forecast_log_path)

    calibrated_models <- readRDS(calibrated_models_path) |>
        dplyr::left_join(
            log |>
                dplyr::filter(success) |>
                dplyr::group_by(subba) |>
                dplyr::filter(end == max(end)) |>
                dplyr::select(subba, method, end),
            by = c("subba", "method")
        ) |>
        dplyr::left_join(input_last_point, by = "subba") |>
        dplyr::mutate(refresh = ifelse(last_time > end, TRUE, FALSE))

    if (!any(calibrated_models$refresh)) {
        message("No new data is available to refresh the forecast")
    } else {
        message("New data is avaiable, starting the forecast refresh process")
        calibrated_models <- calibrated_models |> dplyr::filter(refresh == TRUE)

        for (i in 1:nrow(calibrated_models)) {
            end <- calibrated_models$end[i]
            start <- end - lubridate::hours(train_length)

            temp <- input |>
                dplyr::filter(
                    subba == calibrated_models$subba[i],
                    !!rlang::sym(index) >= start & !!rlang::sym(index) <= end
                )
            if (i == 1) {
                d <- temp
            } else {
                d <- rbind(d, temp)
            }
        }

        forecast <- create_forecast_subba(
            input = d,
            selected_models = calibrated_models,
            h = h,
            index = index,
            var = "y",
            lags = lags
        )

        create_forecast_log(
            forecast = forecast,
            forecast_log_path = forecast_log_path,
            h = h,
            init = init,
            save = save
        )

        subba_success <- log$subba[which(log$success)]

        if (length(subba_success) > 0) {
            forecast_save <- forecast |> dplyr::filter(subba %in% subba_success)
            save_forecast(forecast = forecast_save, forecast_path = forecast_path, init = init, save = save)
        }
    }

    if (!is.null(forecast)) {
        return(forecast)
    }
}



fc <- refresh_forecast(
    input = data,
    forecast_log_path = forecast_log_path,
    forecast_path = forecast_path,
    calibrated_models_path = calibrated_models_path,
    h = h,
    index = "period",
    var = "value",
    train_length = 24 * 31 * 25,
    lags = lags,
    init = FALSE,
    save = TRUE
)
New data is avaiable, starting the forecast refresh process
Add lag: 24
Add lag: 25
Add lag: 26
Add lag: 27
Add lag: 28
Add lag: 48
Add lag: 72
Add lag: 8760
Add lag: 24
Add lag: 25
Add lag: 26
Add lag: 27
Add lag: 28
Add lag: 48
Add lag: 72
Add lag: 8760
Add lag: 24
Add lag: 25
Add lag: 26
Add lag: 27
Add lag: 28
Add lag: 48
Add lag: 72
Add lag: 8760
Add lag: 24
Add lag: 25
Add lag: 26
Add lag: 27
Add lag: 28
Add lag: 48
Add lag: 72
Add lag: 8760
Load archive forecast and append new forecast
Save the forecast to ../data/forecast.csv
fc
# A tibble: 96 × 8
   time                subba method model   yhat  lower  upper forecast_label
   <dttm>              <chr> <chr>  <chr>  <dbl>  <dbl>  <dbl> <chr>         
 1 2024-07-08 00:00:00 PGAE  model6 LM    16372. 14232. 18512. 2024-07-08    
 2 2024-07-08 01:00:00 PGAE  model6 LM    17178. 15038. 19319. 2024-07-08    
 3 2024-07-08 02:00:00 PGAE  model6 LM    18109. 15969. 20249. 2024-07-08    
 4 2024-07-08 03:00:00 PGAE  model6 LM    18825. 16685. 20965. 2024-07-08    
 5 2024-07-08 04:00:00 PGAE  model6 LM    18971. 16831. 21111. 2024-07-08    
 6 2024-07-08 05:00:00 PGAE  model6 LM    18535. 16395. 20675. 2024-07-08    
 7 2024-07-08 06:00:00 PGAE  model6 LM    17833. 15693. 19973. 2024-07-08    
 8 2024-07-08 07:00:00 PGAE  model6 LM    16629. 14489. 18769. 2024-07-08    
 9 2024-07-08 08:00:00 PGAE  model6 LM    15521. 13381. 17662. 2024-07-08    
10 2024-07-08 09:00:00 PGAE  model6 LM    14554. 12414. 16694. 2024-07-08    
# ℹ 86 more rows
plot_forecast(
    input = data,
    forecast = fc,
    var = "value",
    index = "period",
    hours = 24 * 3
)